Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ public enum OperationStatusCode {
/** Like the previous, but for old logs that are about to be deleted */
public static final String HREGION_OLDLOGDIR_NAME = "oldWALs";

/** Separate old log into different dir by regionserver name **/
public static final String SEPERATE_OLDLOGDIR = "hbase.separate.oldlogdir.by.regionserver";
public static final boolean DEFAULT_SEPERATE_OLDLOGDIR = false;

/** Staging dir used by bulk load */
public static final String BULKLOAD_STAGING_DIR_NAME = "staging";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ReplicationQueueInfo {
private final String peerClusterZnode;
private boolean queueRecovered;
// List of all the dead region servers that had this queue (if recovered)
private List<String> deadRegionServers = new ArrayList<>();
private List<ServerName> deadRegionServers = new ArrayList<>();

/**
* The passed znode will be either the id of the peer cluster or
Expand All @@ -66,7 +66,7 @@ public ReplicationQueueInfo(String znode) {
* cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-&lt;server name>-...
*/
private static void
extractDeadServersFromZNodeString(String deadServerListStr, List<String> result) {
extractDeadServersFromZNodeString(String deadServerListStr, List<ServerName> result) {

if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return;

Expand All @@ -85,7 +85,7 @@ public ReplicationQueueInfo(String znode) {
if (i > startIndex) {
String serverName = deadServerListStr.substring(startIndex, i);
if(ServerName.isFullServerName(serverName)){
result.add(serverName);
result.add(ServerName.valueOf(serverName));
} else {
LOG.error("Found invalid server name:" + serverName);
}
Expand All @@ -103,7 +103,7 @@ public ReplicationQueueInfo(String znode) {
if(startIndex < len - 1){
String serverName = deadServerListStr.substring(startIndex, len);
if(ServerName.isFullServerName(serverName)){
result.add(serverName);
result.add(ServerName.valueOf(serverName));
} else {
LOG.error("Found invalid server name at the end:" + serverName);
}
Expand All @@ -112,7 +112,7 @@ public ReplicationQueueInfo(String znode) {
LOG.debug("Found dead servers:" + result);
}

public List<String> getDeadRegionServers() {
public List<ServerName> getDeadRegionServers() {
return Collections.unmodifiableList(this.deadRegionServers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ protected void chore() {
}
}

private void preRunCleaner() {
cleanersChain.stream().forEach(FileCleanerDelegate::preClean);
}

public Boolean runCleaner() {
preRunCleaner();
try {
FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
checkAndDeleteEntries(files);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,10 @@ public interface FileCleanerDelegate extends Configurable, Stoppable {
* this method is used to pass some instance into subclass
* */
void init(Map<String, Object> params);
}

/**
* Used to do some initialize work before every period clean
*/
default void preClean() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,10 @@ private static void split(final Configuration conf, final Path p) throws IOExcep
}

final Path baseDir = FSUtils.getWALRootDir(conf);
final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (conf.getBoolean(HConstants.SEPERATE_OLDLOGDIR, HConstants.DEFAULT_SEPERATE_OLDLOGDIR)) {
archiveDir = new Path(archiveDir, p.getName());
}
WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
}

Expand Down Expand Up @@ -1141,10 +1144,10 @@ private static void usage() {
System.err.println("Arguments:");
System.err.println(" --dump Dump textual representation of passed one or more files");
System.err.println(" For example: "
+ "FSHLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
+ "FSHLog --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
System.err.println(" --split Split the passed directory of WAL logs");
System.err.println(
" For example: " + "FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR");
" For example: " + "FSHLog --split hdfs://example.com:9000/hbase/WALs/DIR");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,19 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.hbase.shaded.com.google.common.base.Predicate;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.zookeeper.KeeperException;

/**
Expand All @@ -54,23 +51,31 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
private ZooKeeperWatcher zkw;
private ReplicationQueuesClient replicationQueues;
private boolean stopped = false;

private Set<String> wals;
private long readZKTimestamp = 0;

@Override
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
// all members of this class are null if replication is disabled,
// so we cannot filter the files
if (this.getConf() == null) {
return files;
}

final Set<String> wals;
public void preClean() {
readZKTimestamp = EnvironmentEdgeManager.currentTime();
try {
// The concurrently created new WALs may not be included in the return list,
// but they won't be deleted because they're not in the checking set.
wals = replicationQueues.getAllWALs();
} catch (KeeperException e) {
LOG.warn("Failed to read zookeeper, skipping checking deletable files");
wals = null;
}
}

@Override
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
// all members of this class are null if replication is disabled,
// so we cannot filter the files
if (this.getConf() == null) {
return files;
}

if (wals == null) {
return Collections.emptyList();
}
return Iterables.filter(files, new Predicate<FileStatus>() {
Expand All @@ -85,8 +90,9 @@ public boolean apply(FileStatus file) {
LOG.debug("Didn't find this log in ZK, deleting: " + wal);
}
}
return !logInReplicationQueue;
}});
return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
Expand All @@ -58,7 +59,6 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;

import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AtomicLongMap;

/**
Expand Down Expand Up @@ -356,7 +356,7 @@ private String formatQueue(String regionserver, ReplicationQueues replicationQue

StringBuilder sb = new StringBuilder();

List<String> deadServers ;
List<ServerName> deadServers;

sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
sb.append(" Queue znode: " + queueId + "\n");
Expand Down Expand Up @@ -385,6 +385,7 @@ private String formatQueue(String regionserver, ReplicationQueues replicationQue
}
return sb.toString();
}

/**
* return total size in bytes from a list of WALs
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
Expand All @@ -51,10 +52,10 @@ public class RecoveredReplicationSource extends ReplicationSource {

@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
super.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerClusterZnode,
super.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerClusterZnode,
clusterId, replicationEndpoint, walFileLengthProvider, metrics);
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
Expand Down Expand Up @@ -98,7 +99,7 @@ public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOExc
}
// Path changed - try to find the right path.
hasPathChanged = true;
if (stopper instanceof ReplicationSyncUp.DummyServer) {
if (server instanceof ReplicationSyncUp.DummyServer) {
// In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
// from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
Path newPath = getReplSyncUpPath(path);
Expand All @@ -107,12 +108,13 @@ public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOExc
} else {
// See if Path exists in the dead RS folder (there could be a chain of failures
// to look at)
List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
List<ServerName> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
LOG.info("NB dead servers : " + deadRegionServers.size());
final Path walDir = FSUtils.getWALRootDir(conf);
for (String curDeadServerName : deadRegionServers) {
for (ServerName curDeadServerName : deadRegionServers) {
final Path deadRsDirectory =
new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName));
new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName
.getServerName()));
Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
for (Path possibleLogLocation : locs) {
Expand Down Expand Up @@ -189,4 +191,9 @@ public void tryFinish() {
public String getPeerId() {
return this.actualPeerId;
}

@Override
public ServerName getServerWALsBelongTo() {
return this.replicationQueueInfo.getDeadRegionServers().get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -59,7 +61,6 @@
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;

import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;


Expand Down Expand Up @@ -94,7 +95,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// The manager of all sources to which we ping back our progress
protected ReplicationSourceManager manager;
// Should we stop everything?
protected Stoppable stopper;
protected Server server;
// How long should we sleep for each retry
private long sleepForRetries;
protected FileSystem fs;
Expand Down Expand Up @@ -139,7 +140,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
* @param conf configuration to use
* @param fs file system to use
* @param manager replication manager to ping to
* @param stopper the atomic boolean to use to stop the regionserver
* @param server the server for this region server
* @param peerClusterZnode the name of our znode
* @param clusterId unique UUID for the cluster
* @param replicationEndpoint the replication endpoint implementation
Expand All @@ -148,10 +149,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
*/
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
this.stopper = stopper;
this.server = server;
this.conf = HBaseConfiguration.create(conf);
this.waitOnEndpointSeconds =
this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
Expand Down Expand Up @@ -330,7 +331,7 @@ public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
public void uncaughtException(final Thread t, final Throwable e) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
stopper.stop("Unexpected exception in " + t.getName());
server.stop("Unexpected exception in " + t.getName());
}
};
}
Expand Down Expand Up @@ -500,7 +501,7 @@ public Path getCurrentPath() {

@Override
public boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning;
return !this.server.isStopped() && this.sourceRunning;
}

/**
Expand Down Expand Up @@ -564,4 +565,9 @@ public void postShipEdits(List<Entry> entries, int batchSize) {
public WALFileLengthProvider getWALFileLengthProvider() {
return walFileLengthProvider;
}

@Override
public ServerName getServerWALsBelongTo() {
return server.getServerName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
Expand All @@ -48,13 +49,13 @@ public interface ReplicationSourceInterface {
* @param manager the manager to use
* @param replicationQueues
* @param replicationPeers
* @param stopper the stopper object for this region server
* @param server the server for this region server
* @param peerClusterZnode
* @param clusterId
* @throws IOException
*/
void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;

Expand Down Expand Up @@ -163,4 +164,11 @@ void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pai
* @param batchSize entries size pushed
*/
void postShipEdits(List<Entry> entries, int batchSize);

/**
* The queue of WALs only belong to one region server. This will return the server name which all
* WALs belong to.
* @return the server name which all WALs belong to
*/
ServerName getServerWALsBelongTo();
}
Loading